Audioデータをクラウドに送ってみました。 Amazon Kinesis Data Firehose + S3 (JSON)
1 はじめに
CX事業本部の平内(SIN)です。
前回、エッジ側のAudioデータをクラウドへ送信する要領を確認してみました。
上記は、MQTT -> Amazon Kinesis Data Firehose -> S3という形でしたが、今回は、単純にS3に保存する事だけが要件という想定で、IoT Coreを経由せず、直接 Amazon Kinesis Data Firehoseに送る要領を試してみました。
S3上の保存形式が変わらないようにしましたので、保存されたデータから時間を指定して、wavファイルとして取り出すLambdaは、前回のものが、そのまま利用可能です。
また、Audio入力も、前回と同じ、Webカメラ(C920)です。
2 エッジ側のコード
1秒に一回のタイミングで、AudioデータをAmazon Kinesis Data Firehoseに送信しているコードです。
サンプリングレートは、擬似的に32KHzから8Hzとし、チャンネルも1chに削除してますが、RAWデータは、テキスト化することで、16K/sec から 22K/sec程度になっています。
kinesisへのput_recordの際に、データの最後に、改行を入れることで、1秒のデータが1行になるようにしています。
権限は、サンプルということで、CognitoのIdentity Idの未認証のロールを使用しています。
index.py
import pyaudio from producer import Producer import numpy as np producer = Producer() DEVICE_INDEX = 0 CHANNELS = 2 SAMPLE_RATE = 32000 # サンプルレート FORMAT = pyaudio.paInt16 CHUNK = SAMPLE_RATE # 1秒ごとに取得する # open stream p = pyaudio.PyAudio() stream = p.open(format = FORMAT, channels = CHANNELS, rate = SAMPLE_RATE, input = True, input_device_index = DEVICE_INDEX, frames_per_buffer = CHUNK) try: print("start ...") while True: # 1秒分のデータ読み込み data = stream.read(CHUNK) # numpy配列に変換 data = np.frombuffer(data, dtype="int16") # チャンネル 2ch -> 1ch data = data[0::2] # サンプルレート 32000Hz -> 8000Hz data = data[0::4] # byteに戻す data = data.tobytes() # Amazon Kinesis Data Firehoseへの送信 producer.send(data) except: stream.stop_stream() stream.close() p.terminate()
producer.py
import json from datetime import datetime import base64 import boto3 from boto3.session import Session import time class Producer(): def __init__(self): self.__identity_id = "ap-northeast-1:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" self.__region = "ap-northeast-1" self.__delivery_stream_name = 'audio_transmission_firehose_stream' self.__firehose = self.__get_firehose() def __get_firehose(self): client = boto3.client('cognito-identity', self.__region) resp = client.get_id(IdentityPoolId = self.__identity_id) resp = client.get_credentials_for_identity(IdentityId=resp['IdentityId']) secretKey = resp['Credentials']['SecretKey'] accessKey = resp['Credentials']['AccessKeyId'] token = resp['Credentials']['SessionToken'] session = Session(aws_access_key_id = accessKey, aws_secret_access_key = secretKey, aws_session_token = token, region_name = self.__region) return session.client('firehose') def send(self, data): now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') b64_data = base64.b64encode(data) raw_data = b64_data.decode() data = { "timestamp": now, "raw_data": raw_data } try: response = self.__firehose.put_record( DeliveryStreamName=self.__delivery_stream_name, Record={ 'Data': "{}\n".format(json.dumps(data)) } ) except Exception as e: print("Exception: {}", e.args) print('put_record RecordId:{}'.format(response['RecordId']))
送信している様子です。
3 Amazon Kinesis Data Firehose
※ このセクションは、前回と同じ内容です。
Amazon Kinesis Data Firehoseは、audio_transmission_firehose_streamという名前で作成し、特に、コンバートや圧縮などは設定せず、destinationをS3バケットとしています。
なお、Buffer sizeは、10M、Buffer intervalは60secとしました。この設定により、1秒ごとに送られくるデータを、1分毎に1ファイルとしてS3に保存されることになります。
1秒分のデータは、約22Kなので、約1.4M(22k*60)以上のバッファにしておけば、Buffer Intervalが先にトリガーされるため、1分毎に出力されるという想定です。
もし、要件的に、あまりリアルタイム性が必要ない場合は、1ファイルに格納するデータ量を大きく取ることで、S3への書き込み回数は節約できることになります。
4 S3
※ このセクションは、前回と同じ内容です。
S3に出力されたデータです。年月日のプレフィックスと秒数まで入ったKey名で保存されていることが分かります。
60秒分のデータが格納された1つのファイルです。ここから対象秒のRAWデータが取り出せます。
1つのファイルを見ると、1秒ごとのデータが60行(60秒分)格納されているのが確認できます。ここから対象秒のRAWデータが取り出せます。
ここまでで、Audioデータの送信は完了となります。
5 Lambda
前回作成したLambdaが、そのまま利用可能です。
6 最後に
今回は、Amazon Kinesis Data Firehoseを使用してAudioデータをクラウドに送信する要領を試してみました。
最初に書いた通り、データに付随するアトリビュートなどが無く、単純にS3に保存するだけが要件ということであれば、これで十分かも知れません。
全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_2